1   package org.apache.lucene.replicator;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements.  See the NOTICE file distributed with
6    * this work for additional information regarding copyright ownership.
7    * The ASF licenses this file to You under the Apache License, Version 2.0
8    * (the "License"); you may not use this file except in compliance with
9    * the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  import java.io.IOException;
21  import java.util.Collections;
22  import java.util.HashSet;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Set;
26  import java.util.concurrent.Callable;
27  import java.util.regex.Matcher;
28  
29  import org.apache.lucene.index.DirectoryReader;
30  import org.apache.lucene.index.IndexCommit;
31  import org.apache.lucene.index.IndexFileNames;
32  import org.apache.lucene.index.IndexNotFoundException;
33  import org.apache.lucene.index.IndexWriter;
34  import org.apache.lucene.replicator.ReplicationClient.ReplicationHandler;
35  import org.apache.lucene.store.Directory;
36  import org.apache.lucene.store.IOContext;
37  import org.apache.lucene.util.IOUtils;
38  import org.apache.lucene.util.InfoStream;
39  
40  /**
41   * A {@link ReplicationHandler} for replication of an index. Implements
42   * {@link #revisionReady} by copying the files pointed by the client resolver to
43   * the index {@link Directory} and then touches the index with
44   * {@link IndexWriter} to make sure any unused files are deleted.
45   * <p>
46   * <b>NOTE:</b> this handler assumes that {@link IndexWriter} is not opened by
47   * another process on the index directory. In fact, opening an
48   * {@link IndexWriter} on the same directory to which files are copied can lead
49   * to undefined behavior, where some or all the files will be deleted, override
50   * other files or simply create a mess. When you replicate an index, it is best
51   * if the index is never modified by {@link IndexWriter}, except the one that is
52   * open on the source index, from which you replicate.
53   * <p>
54   * This handler notifies the application via a provided {@link Callable} when an
55   * updated index commit was made available for it.
56   * 
57   * @lucene.experimental
58   */
59  public class IndexReplicationHandler implements ReplicationHandler {
60    
61    /**
62     * The component used to log messages to the {@link InfoStream#getDefault()
63     * default} {@link InfoStream}.
64     */
65    public static final String INFO_STREAM_COMPONENT = "IndexReplicationHandler";
66    
67    private final Directory indexDir;
68    private final Callable<Boolean> callback;
69    
70    private volatile Map<String,List<RevisionFile>> currentRevisionFiles;
71    private volatile String currentVersion;
72    private volatile InfoStream infoStream = InfoStream.getDefault();
73    
74    /**
75     * Returns the last {@link IndexCommit} found in the {@link Directory}, or
76     * {@code null} if there are no commits.
77     */
78    public static IndexCommit getLastCommit(Directory dir) throws IOException {
79      try {
80        if (DirectoryReader.indexExists(dir)) {
81          List<IndexCommit> commits = DirectoryReader.listCommits(dir);
82          // listCommits guarantees that we get at least one commit back, or
83          // IndexNotFoundException which we handle below
84          return commits.get(commits.size() - 1);
85        }
86      } catch (IndexNotFoundException e) {
87        // ignore the exception and return null
88      }
89      return null;
90    }
91    
92    /**
93     * Verifies that the last file is segments_N and fails otherwise. It also
94     * removes and returns the file from the list, because it needs to be handled
95     * last, after all files. This is important in order to guarantee that if a
96     * reader sees the new segments_N, all other segment files are already on
97     * stable storage.
98     * <p>
99     * The reason why the code fails instead of putting segments_N file last is
100    * that this indicates an error in the Revision implementation.
101    */
102   public static String getSegmentsFile(List<String> files, boolean allowEmpty) {
103     if (files.isEmpty()) {
104       if (allowEmpty) {
105         return null;
106       } else {
107         throw new IllegalStateException("empty list of files not allowed");
108       }
109     }
110     
111     String segmentsFile = files.remove(files.size() - 1);
112     if (!segmentsFile.startsWith(IndexFileNames.SEGMENTS) || segmentsFile.equals(IndexFileNames.OLD_SEGMENTS_GEN)) {
113       throw new IllegalStateException("last file to copy+sync must be segments_N but got " + segmentsFile
114           + "; check your Revision implementation!");
115     }
116     return segmentsFile;
117   }
118 
119   /**
120    * Cleanup the index directory by deleting all given files. Called when file
121    * copy or sync failed.
122    */
123   public static void cleanupFilesOnFailure(Directory dir, List<String> files) {
124     for (String file : files) {
125       // suppress any exception because if we're here, it means copy
126       // failed, and we must cleanup after ourselves.
127       IOUtils.deleteFilesIgnoringExceptions(dir, file);
128     }
129   }
130   
131   /**
132    * Cleans up the index directory from old index files. This method uses the
133    * last commit found by {@link #getLastCommit(Directory)}. If it matches the
134    * expected segmentsFile, then all files not referenced by this commit point
135    * are deleted.
136    * <p>
137    * <b>NOTE:</b> this method does a best effort attempt to clean the index
138    * directory. It suppresses any exceptions that occur, as this can be retried
139    * the next time.
140    */
141   public static void cleanupOldIndexFiles(Directory dir, String segmentsFile, InfoStream infoStream) {
142     try {
143       IndexCommit commit = getLastCommit(dir);
144       // commit == null means weird IO errors occurred, ignore them
145       // if there were any IO errors reading the expected commit point (i.e.
146       // segments files mismatch), then ignore that commit either.
147       if (commit != null && commit.getSegmentsFileName().equals(segmentsFile)) {
148         Set<String> commitFiles = new HashSet<>();
149         commitFiles.addAll(commit.getFileNames());
150         Matcher matcher = IndexFileNames.CODEC_FILE_PATTERN.matcher("");
151         for (String file : dir.listAll()) {
152           if (!commitFiles.contains(file)
153               && (matcher.reset(file).matches() || file.startsWith(IndexFileNames.SEGMENTS))) {
154             // suppress exceptions, it's just a best effort
155             IOUtils.deleteFilesIgnoringExceptions(dir, file);
156           }
157         }
158       }
159     } catch (Throwable t) {
160       // ignore any errors that happen during this state and only log it. this
161       // cleanup will have a chance to succeed the next time we get a new
162       // revision.
163       if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
164         infoStream.message(INFO_STREAM_COMPONENT, "cleanupOldIndexFiles(): failed on error " + t.getMessage());
165       }
166     }
167   }
168   
169   /**
170    * Copies the files from the source directory to the target one, if they are
171    * not the same.
172    */
173   public static void copyFiles(Directory source, Directory target, List<String> files) throws IOException {
174     if (!source.equals(target)) {
175       for (String file : files) {
176         target.copyFrom(source, file, file, IOContext.READONCE);
177       }
178     }
179   }
180 
181   /**
182    * Constructor with the given index directory and callback to notify when the
183    * indexes were updated.
184    */
185   public IndexReplicationHandler(Directory indexDir, Callable<Boolean> callback) throws IOException {
186     this.callback = callback;
187     this.indexDir = indexDir;
188     currentRevisionFiles = null;
189     currentVersion = null;
190     if (DirectoryReader.indexExists(indexDir)) {
191       final List<IndexCommit> commits = DirectoryReader.listCommits(indexDir);
192       final IndexCommit commit = commits.get(commits.size() - 1);
193       currentRevisionFiles = IndexRevision.revisionFiles(commit);
194       currentVersion = IndexRevision.revisionVersion(commit);
195       final InfoStream infoStream = InfoStream.getDefault();
196       if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
197         infoStream.message(INFO_STREAM_COMPONENT, "constructor(): currentVersion=" + currentVersion
198             + " currentRevisionFiles=" + currentRevisionFiles);
199         infoStream.message(INFO_STREAM_COMPONENT, "constructor(): commit=" + commit);
200       }
201     }
202   }
203   
204   @Override
205   public String currentVersion() {
206     return currentVersion;
207   }
208   
209   @Override
210   public Map<String,List<RevisionFile>> currentRevisionFiles() {
211     return currentRevisionFiles;
212   }
213   
214   @Override
215   public void revisionReady(String version, Map<String,List<RevisionFile>> revisionFiles,
216       Map<String,List<String>> copiedFiles, Map<String,Directory> sourceDirectory) throws IOException {
217     if (revisionFiles.size() > 1) {
218       throw new IllegalArgumentException("this handler handles only a single source; got " + revisionFiles.keySet());
219     }
220     
221     Directory clientDir = sourceDirectory.values().iterator().next();
222     List<String> files = copiedFiles.values().iterator().next();
223     String segmentsFile = getSegmentsFile(files, false);
224     String pendingSegmentsFile = "pending_" + segmentsFile;
225     
226     boolean success = false;
227     try {
228       // copy files from the client to index directory
229       copyFiles(clientDir, indexDir, files);
230       
231       // fsync all copied files (except segmentsFile)
232       indexDir.sync(files);
233       
234       // now copy and fsync segmentsFile as pending, then rename (simulating lucene commit)
235       indexDir.copyFrom(clientDir, segmentsFile, pendingSegmentsFile, IOContext.READONCE);
236       indexDir.sync(Collections.singletonList(pendingSegmentsFile));
237       indexDir.renameFile(pendingSegmentsFile, segmentsFile);
238       
239       success = true;
240     } finally {
241       if (!success) {
242         files.add(segmentsFile); // add it back so it gets deleted too
243         files.add(pendingSegmentsFile);
244         cleanupFilesOnFailure(indexDir, files);
245       }
246     }
247 
248     // all files have been successfully copied + sync'd. update the handler's state
249     currentRevisionFiles = revisionFiles;
250     currentVersion = version;
251     
252     if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
253       infoStream.message(INFO_STREAM_COMPONENT, "revisionReady(): currentVersion=" + currentVersion
254           + " currentRevisionFiles=" + currentRevisionFiles);
255     }
256     
257     // Cleanup the index directory from old and unused index files.
258     // NOTE: we don't use IndexWriter.deleteUnusedFiles here since it may have
259     // side-effects, e.g. if it hits sudden IO errors while opening the index
260     // (and can end up deleting the entire index). It is not our job to protect
261     // against those errors, app will probably hit them elsewhere.
262     cleanupOldIndexFiles(indexDir, segmentsFile, infoStream);
263 
264     // successfully updated the index, notify the callback that the index is
265     // ready.
266     if (callback != null) {
267       try {
268         callback.call();
269       } catch (Exception e) {
270         throw new IOException(e);
271       }
272     }
273   }
274 
275   /** Sets the {@link InfoStream} to use for logging messages. */
276   public void setInfoStream(InfoStream infoStream) {
277     if (infoStream == null) {
278       infoStream = InfoStream.NO_OUTPUT;
279     }
280     this.infoStream = infoStream;
281   }
282   
283 }